Confluent спонсировал этот пост.
Современные платформы данных требуют контекста в реальном времени, чтобы извлечь значимую информацию. Поскольку агенты ИИ становятся все более распространенными, эта контекстуальная точность имеет решающее значение для минимизации галлюцинаций и обеспечения надежных результатов. Инженеры по данным, которые используют Python, один из самых популярных языков в мире, все чаще нуждаются в работе с Apache Kafka и Apache Flink для обработки потоковой передачи данных.
В то время как Python доминирует в разработке данных (удерживая место № 1 в рейтинге Tiobe и Pypl), Apache Kafka и Apache Flink написаны на Java. Тем не менее, отличные интеграции Python делают эти рамки беспроблемы для разработчиков Python, что позволяет им использовать эти мощные инструменты без необходимости глубоких знаний Java.
Почему Python доминирует в разработке данных
Популярность Python в инженерии данных не случайна; Есть порты Python, предлагаемые практически для каждой основной структуры данных, включая:
- Обработка потоков: Pyflink, Kafka python sdks
- Обработка партии: Pyspark, Apache Airflow, Dagster
- Манипулирование данными: Pyarrow, Python SDK для DuckDB
- Оркестровая рабочего процесса: Apache Airflow, префект
Эта обширная экосистема позволяет инженерам данных создавать сквозные трубопроводы, оставаясь в рамках знакомого синтаксиса и узоров Python. Если вам нужно обрабатывать потоки данных в реальном времени-например, для анализа поведения пользователей, обнаружения аномалий или предсказательного обслуживания, Python предоставляет инструменты, не заставляя вас переключать языки.
Apache Kafka: потоковое хранилище сделано «Pythonic»
Apache Kafka стала де-факто стандартом для платформ потоковой передачи данных, предлагая простые в использовании API, важные функции воспроизведения, поддержку схемы и исключительную производительность. В то время как Apache Kafka написана на Java, разработчики Python получают доступ к нему через Librdkafka, высокоэффективную реализацию C, которая обеспечивает готовность к производству.
Библиотека Confluent-Kafka-Python служит основным интерфейсом, предлагая безопасные потоки производителя, потребительские и административные классы, совместимые с версией брокеров Apache Kafka 0.8, включая слитую облачную и конфликтующую платформу. Установка проста: PIP установить Confluent-Kafka.
Реализация производителя
Вот как просто публиковать сообщения в Кафку:
Из Confluent_kafka Import Produce Produce P = Produceer ({‘bootstrap.servers’: ‘mybroker1, mybroker2’}) def Delivery_report (err, msg): «» «» Вызывается один раз для каждого сообщения, созданного для указания результата доставки. «» Если ошибка не: print (‘доставка сообщения не удалась:}’. [{}]’.format (msg.topic (), msg.partition ())) для данных в some_data_source: # Отчет о доставке запуска от предыдущих вызовов p.poll (0) # Асинхронно 1234567891011121314151617181920 из Confluent_kafka Import Producer P = Производитель ({‘bootstrap.servers’: ‘MyBroker1, MyBroker2’}) def Deliver_Report (err, msg): «», вызываемый один раз для каждого сообщения, созданного к указанному результату доставки. {} ‘. format (err)) else: print (‘ Сообщение, доставленное в {} [{}]’.format (msg.topic (), msg.partition ())) для данных в some_data_source: # Отчет о доставке запуска от предыдущих вызовов p.poll (0) # асинхронно
Внедрение потребителей
Потребление сообщений одинаково просты:
из Confluent_kafka Import Consumer C = Consumer ({‘bootstrap.servers’: ‘mybroker’, ‘group.id’: ‘mygroup’, ‘auto.offset.reset’: ‘Самый ранний’}) c.subscribe ([‘user_clicks’]В то время как true: msg = c.poll (1.0), если msg не является:) Продолжить, если msg.error (): print («Ошибка потребителя: {}». Format (msg.error ())) Продолжить print (‘Полученное сообщение: {}’. Format (msg.value (). Дефод (‘utf-8)) c.close () (). 12345678910111213141516171819202122 из Confluent_kafka Import Consumer C = потребитель ({‘bootstrap.servers’: ‘mybroker’, ‘Group.id’: ‘mygroup’, ‘auto.offset.reset’: ‘nevely’}) c.[‘user_clicks’]В то время как true: msg = c.poll (1.0), если msg не является:) Продолжить, если msg.error (): print («Ошибка потребителя: {}». Format (msg.error ())) Продолжить print (‘Полученное сообщение: {}’. Format (msg.value (). Дефод (‘utf-8)) c.close () ().
Клиент Confluent-Kafka-Python поддерживает паритет с Java SDK, обеспечивая максимальную производительность пропускной способности. Поскольку он поддерживается Confluent (который был основан создателем Кафки), он остается будущим и готовым к производству.
Apache Flink: обработка потока с Pyflink
В то время как Kafka превосходит при хранении потоков данных, обработка и обогащение этих потоков требует дополнительных инструментов. Apache Flink служит распределенным механизмом обработки для современных вычислений по сравнению с неограниченными и ограниченными потоками данных.
Pyflink предоставляет Python API, который позволяет инженерам данных создавать масштабируемые рабочие нагрузки пакетов и потоковой передачи, от трубопроводов в режиме реального времени до крупномасштабного исследовательского анализа, трубопроводов машинного обучения (ML) и извлечения, преобразования, нагрузки (ETL). Инженеры данных, знакомые с пандами
Pyflink APIS: выбор вашего уровня сложности
Pyflink предлагает два основных API:
Общий шаблон включает в себя применение агрегаций и операций с временным окном (падающие или прыгающие окна) к темам Kafka, а затем вывод результатов к темам ниже по течению. Например, преобразование темы «user_clicks» в резюме «top_users».
Преобразования в реальном времени в действии
Вот задача API API таблицы Pyflink, которая обрабатывает потоковые данные с оконными агрегациями:
от pyflink.datastream import streamexecutionEnvironment of pyflink.table Импорт среды, StreamtableEnvironment def main (): env = intreexecutionEnvironment.get_excution_environment () settings = evenmentsettings.in_mode_mode () incontableenverment.create. env.add_jars («Flink-sql-connector-kafka-4.0.0-2.0.jar») # Определите оконную агрегацию top_users_sql = «» «select user_id, count (curl) как cnt, window_start, window_end из таблицы (Tumble user_clicks, docriptor (procrime), vined ‘windowd), windowd winder) user_id «»» result = tenv.sql_query(top_users_sql) # Execute and sink results tenv.execute_sql(sink_ddl) 123456789101112131415161718192021222324252627282930 from pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import Environmentsettings, StreamtableEnvironment def main (): env = streamexecutionEnvironment.get_execution_environment () settings = evenerseSettings.in_streaming_mode () tenv = StreamtableEnvironment.create (env, settings) # add kafka connector. env.add_jars («Flink-sql-connector-kafka-4.0.0-2.0.jar») # Определите оконную агрегацию top_users_sql = «» «select user_id, count (curl) как cnt, window_start, window_end из таблицы (Tumble user_clicks, docriptor (procrime), vined ‘windowd), windowd winder) user_id «» «» result = tenv.sql_query (top_users_sql) # Выполнить и погрузиться
Этот подход позволяет сложным вариантам использования, такими как:
- Анализ поведения пользователя из данных ClickStream
- Обнаружение аномалии в производственных процессах
- Предупреждающие предупреждения о техническом обслуживании из интернета вещей (IoT) телеметрия
Преимущество Python в современной потоковой передаче данных
Комбинация клиентов Pyflink и Python Kafka создает мощный инструментарий для инженеров, обученных Python. Вы можете внести свой вклад в модернизацию платформы данных без изучения Java, используя существующую экспертизу Python при доступе к потоковым возможностям предприятия.
Ключевые преимущества включают:
- Знакомый синтаксис: Оставаться в экосистеме Python
- Производственная производительность: Librdkafka и Flink’s Java Engine обеспечивают скорость предприятия
- Полный доступ к функциям: Нет компромисса в отношении возможностей кафки или фантазии
- Экосистемная интеграция: Бесполезное соединение с другими инструментами данных Python
Начало работы требует всего две установки PIP: PIP установить Confluent-Kafka и PIP установить Apache-Flink. Оттуда вы можете создать сложные конвейеры в реальном времени, которые конкурируют с любой реализацией Java.
Поскольку AI и аналитика в реальном времени продолжают управлять эволюцией платформы данных, инженеры Python Data, оснащенные навыками Kafka и Flink, расположены для того, чтобы привести к этому преобразованию. Барьеры между производительностью Python и производительности Java эффективно исчезли, что делает это идеальным временем для расширения вашего опыта потоковых данных.
Confluent, основанная оригинальными создателями Apache Kafka, впервые провели полную платформу потоковой передачи данных, которая транслирует, соединяет, обрабатывает и управляет данными, когда она течет по всему бизнесу. С Confluent любая организация может модернизировать свой бизнес и управлять им в режиме реального времени. Узнайте больше последних из Comfluent Trending Stories YouTube.com/ThenewStack Tech движется быстро, не пропустите эпизод. Подпишитесь на наш канал YouTube, чтобы транслировать все наши подкасты, интервью, демонстрации и многое другое. Группа подпишитесь с эскизом. Diptiman Raichaudhuri — адвокат разработчика штата в Confluent. Raichaudhuri — ветеран ИТ -индустрии с более чем двух десятилетий опыта работы в глобальных организациях по предоставлению услуг по продуктам и программным обеспечениям. В Confluent он тесно сотрудничает с разработчиками вокруг … Подробнее от Diptiman Raichaudhuri