当前位置: 首页 > news >正文

flinksql实践(从kafka读数据)

本案例是基于flinksql实现的,将逐步实现从kafka读写数据,聚合查询,关联维表(外部系统)等。

环境准备

        首先确保电脑已经安装好zookeeper、kafka、flink。本文flink使用单机模式,zookeeper和kafka也使用单机配置。(环境配置部分可以跳过)

        首先启动zookeeper

# 启动zookeeper
bin/zkServer.sh start# 关闭zookeeper
bin/zkServer.sh stop

        然后启动kafka

# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 关闭kafka 
bin/kafka-server-stop.sh -daemon config/server.properties

        启动flink集群,并打开sql客户端

# 启动flink集群
bin/start-cluster.sh
# 启动sql客户端
bin/sql-client.sh embedded -s yarn-session 
# 关闭flink集群
bin/stop-cluster.sh

 案例1:从kafka读取csv格式数据

数据准备

csv格式数据是以','进行分隔。本案例设计一个学生信息表,包含学生id,学生姓名,学生年级字段。

# 示范数据 
1,"zhangsan",2021
2,"lisi",2021
3,"wangwu",2024
4,"Bob",2021
5,"Lily",2022

kafka 主题相关命令

# 遍历已有topic
bin/kafka-topics.sh --bootstrap-server 192.168.137.201:9092 --list
# 生产者
bin/kafka-console-producer.sh --bootstrap-server node1:9092 --topic test
# 消费者
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test --from-beginning

创建kafka表

create table kafkatable (`stu_id` bigint,`stu_name` string,`grade` bigint
) with ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = 'node1:9092','properties.group.id' = 'testgroup','scan.startup.mode' = 'latest-offset','format' = 'csv'
)	

本案例读test分区的最新数据,如需修改,参考以下链接:

Kafka | Apache Flink

注意:kafka-flink连接器需要手动上传到flink安装目录下的lib目录下,官网有提供对应jar包。

案例实现

-- 实时查看kafkatable数据
select * from kafkatable;-- 查看不同年级学生数量
select grade,count(1) from kafkatable group by grade;

案例2:从kafka读取json格式数据

数据准备

{"stu_id": 1, "stu_name": "zhangsan", "grade": 2021}
{"stu_id": 2, "stu_name": "lisi", "grade": 2021}
{"stu_id": 3, "stu_name": "wangwu", "grade": 2024}
{"stu_id": 4, "stu_name": "Bob", "grade": 2021}
{"stu_id": 5, "stu_name": "Lily", "grade": 2022}

创建kafka表

create table student_info (`stu_id` bigint,`stu_name` string,`grade` bigint
) with ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = 'node1:9092','properties.group.id' = 'testgroup','scan.startup.mode' = 'latest-offset','format' = 'json'
);	

案例实现

-- 实时遍历
select * from student_info;

相关文章:

flinksql实践(从kafka读数据)

本案例是基于flinksql实现的,将逐步实现从kafka读写数据,聚合查询,关联维表(外部系统)等。 环境准备 首先确保电脑已经安装好zookeeper、kafka、flink。本文flink使用单机模式,zookeeper和kafka也使用单机配置。(环境配置部分可以…...

Linux系统:文件系统前言,详解CHSLBA地址

本节重点 理解硬盘的组成结构与工作原理理解柱面、扇区CHS地址进行寻址对整块硬盘的逻辑抽象LBA地址进行寻址LBA与CHS地址的相互转换 一、初识硬盘 1.1 基本概念 硬盘(Hard Disk Drive,HDD)是一种用于存储和检索数据的计算机硬件设备。它…...

2025年01月10日浙江鑫越系统科技前端面试

目录 vue2 和 vue3 的区别vue 怎么封装组件js 怎么把一个数组置空怎么组件自己调用自己的组件v-bind:attribute 和 v-bind“{attribute}” 的区别var let const 的区别this 指向作用域链闭包原型链事件循环 1. vue2 和 vue3 的区别 Vue 2 和 Vue 3 在多个方面存在区别&#…...

2025.05.11阿里云机考真题算法岗-第三题

📌 点击直达笔试专栏 👉《大厂笔试突围》 💻 春秋招笔试突围在线OJ 👉 笔试突围OJ 03. 镜像追踪游戏 问题描述 A先生正在玩一款名为「镜像追踪」的游戏。游戏在一个 n n n...

实景三维建模软件应用场景(众趣科技实景三维建模)

实景三维建模软件应用场景概述 实景三维建模软件,作为数字化时代的重要工具,不仅能够真实、立体、时序化地反映和表达物理世界,还为国家的基础设施建设和数字化发展提供了有力的支撑。 在测绘与地理信息领域,实景三维建模软件是构…...

centos中libc.so.6No such file的解决方式

你在运行安装程序时遇到了以下错误: Configuring the installer for this systems environment...strings: /lib/libc.so.6: No such file 这个错误通常是由于系统中缺少 glibc(GNU C Library)或其相关文件导致的。glibc 是 Linux 系统中的…...

MapReduce打包运行

1. 编写 MapReduce 程序 首先需要编写 MapReduce 程序,通常包含 Mapper、Reducer 和 Driver 类。例如,一个简单的 WordCount 程序: java import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Con…...

RustDesk:开源电脑远程控制软件

RustDesk:开源电脑远程控制软件 RustDesk:开源电脑远程控制软件一、RustDesk 简介二、下载教程2.1 桌面版下载2.2 Android 版下载 三、安装教程3.1 桌面版安装 四、功能讲解4.1 远程控制4.2 文件传输4.3 安全可靠4.4 自定义服务器 五、RustDesk技术架构解…...

【Unity】WebGL开发问题汇总

1 前言 主要记录下WebGL开发过程中遇到的各种问题。 2 问题 2.1 中文字体不显示 问题: 经典问题了。将项目打包在WebGL平台后,运行发现所用中文字体都不现实了。 解决办法: 替换Text组件的“字体”。可以将电脑字体复制到项目当中然后替换组…...

华为海思系列----昇腾张量编译器(ATC)模型转换工具----入门级使用指南(LINUX版)

由于官方SDK比较冗余且经常跨文档讲解且SDK整理的乱七八糟,对于新手来说全部看完上手成本较高,本文旨在以简短的方式介绍 CAFFE / ONNX 模型转 om 模型,并进行推理的全流程。希望能够帮助到第一次接触华为海思框架的道友们。大佬们就没必要看这种基础文章啦! 注:本…...

c++STL-list的模拟实现

cSTL-list的模拟实现 list源码剖析list模拟实现list构造函数拷贝构造函数赋值重载迭代器 iterator访问结点数size和判空尾插 push_back头插 push_front尾删pop_back头删pop_front插入 insert删除 erase清空clear和析构函数访问结点 参考程序 list源码剖析 建议先看cSTL-list的…...

RabbitMQ 核心概念与消息模型深度解析(二)

四、代码实战 了解了 RabbitMQ 的核心概念和消息模型后,接下来我们通过代码实战来进一步加深对它们的理解和掌握。下面将以 Java 和 Spring AMQP 为例,展示如何使用 RabbitMQ 进行消息的发送和接收。 4.1 环境准备 在开始编写代码之前,需要…...

JAVA研发+前后端分离,ZKmall开源商城B2C商城如何保障系统性能?

在电商行业竞争白热化的当下,B2C 商城系统的性能表现成为决定用户留存与商业成败的关键因素。ZKmall 开源商城凭借 Java 研发与前后端分离架构的深度融合,构建起一套高效、稳定且具备强大扩展性的系统架构,从底层技术到上层应用全方位保障性能…...

【android bluetooth 框架分析 02】【Module详解 6】【StorageModule 模块介绍】

1. 背景 我们在 gd_shim_module 介绍章节中&#xff0c;看到 我们将 StorageModule 模块加入到了 modules 中。 // system/main/shim/stack.cc modules.add<storage::StorageModule>();在 ModuleRegistry::Start 函数中我们对 加入的所有 module 挨个初始化。 而在该函…...

Datawhale 5月llm-universe 第1次笔记

课程地址&#xff1a;GitHub - datawhalechina/llm-universe: 本项目是一个面向小白开发者的大模型应用开发教程&#xff0c;在线阅读地址&#xff1a;https://datawhalechina.github.io/llm-universe/ 难点&#xff1a;配置conda环境变量 我用的vscode github方法 目录 重要…...

Linux架构篇、第五章git2.49.0部署与使用

Linux_架构篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;git2.49.0部署与使用 版本号: 1.0,0 作者: 老王要学习 日期: 2025.05.13 适用环境: Centos7 文档说明 这份文档聚焦于在 CentOS 7 环境下部署和…...

南方科技大学Science! 自由基不对称催化新突破 | 乐研试剂

近日&#xff0c;南方科技大学刘心元教授团队联合浙江大学洪鑫教授团队在自由基不对称催化领域取得新进展。课题组开发了一系列大位阻阴离子 N,N,P-配体&#xff0c;用于铜催化未活化外消旋仲烷基碘与亚砜亚胺的不对称胺化反应。该反应表现出广泛的底物兼容性&#xff0c;涵盖具…...

手机换IP真的有用吗?可以干什么?

在当今数字化时代&#xff0c;网络安全和个人隐私保护日益受到重视。手机作为我们日常生活中不可或缺的工具&#xff0c;其网络活动痕迹往往通过IP地址被记录和追踪。那么&#xff0c;手机换IP真的有用吗&#xff1f;它能为我们带来哪些实际好处&#xff1f;本文将为你一一解答…...

【C++详解】类和对象(上)类的定义、实例化、this指针

文章目录 一、类的定义1、类定义格式2、访问限定符3、类域 二、实例化1、实例化概念2、对象大小 三、this指针 一、类的定义 1、类定义格式 class为定义类的关键字&#xff0c;Stack为类的名字&#xff0c;{}中为类的主体&#xff0c;注意类定义结束时后面分号不能省略。类体中…...

C语言—再学习(数据的存储类别)

在c语言中&#xff0c;每个变量和函数都有两个属性&#xff1a;数据类型和数据的存储类别 C的存储类别包括4种&#xff1a;自动挡&#xff08;auto&#xff09;、静态的&#xff08;static&#xff09;、寄存器的&#xff08;register&#xff09;、外部的&#xff08;extern&…...

软考软件评测师——计算机组成与体系结构(分级存储架构)

一、虚拟存储技术 虚拟存储系统通过软硬件协同实现内存扩展&#xff0c;其核心特征包括&#xff1a; 逻辑容量扩展能力&#xff1a;实际物理内存与外存结合&#xff0c;呈现远大于物理内存的连续地址空间动态加载机制&#xff1a;程序运行时仅加载必要部分到内存&#xff0c;…...

需求跟踪矩阵准确性的5大策略

需求跟踪矩阵的准确性可显著提升软件项目质量&#xff0c;确保需求的全面覆盖、减少遗漏和偏差&#xff0c;有利于优化变更管理&#xff0c;降低返工风险&#xff0c;最终保障产品符合用户预期和业务目标。如果不能保证跟踪矩阵的准确性&#xff0c;可能会导致需求遗漏、测试覆…...

【调度算法】MAPF多智能体路径规划问题

参考链接&#xff1a;https://blog.csdn.net/qq_43353179/article/details/129396325 在这篇博客的基础上对一些省略的部分进行补充。 网站&#xff1a;https://mapf.info/ 可行性判断 1. k-鲁棒性&#xff08;k-robust MAPF&#xff09; 在经典 MAPF 中&#xff0c;只要所有…...

迅龙3号基于兆讯MH22D3适配CST328多点触摸驱动开发笔记

MH22D3芯片是兆讯公司新推出的基于cortex-M3内核的新一代芯片&#xff0c;专注于显示应用&#xff0c;其主频高达216Mhz&#xff0c;64KB SRAM&#xff0c;512KB Flash&#xff0c;开发UI应用游刃有余。详细介绍请看&#xff1a;MH22D3新一代显控应用性价比之王 新龙微基于MH22…...

推荐算法工程化:ZKmall模板商城的B2C 商城的用户分层推荐策略

在 B2C 电商竞争激烈的市场环境中&#xff0c;精准推荐已成为提升用户体验、促进商品销售的关键。ZKmall 模板商城通过推荐算法工程化手段&#xff0c;深度挖掘用户数据价值&#xff0c;制定科学的用户分层推荐策略&#xff0c;实现 “千人千面” 的个性化推荐&#xff0c;帮助…...

你对于JVM底层的理解

JVM&#xff08;Java虚拟机&#xff09;是一个执行Java字节码的虚拟机&#xff0c;负责将Java程序的代码转化为能够在不同操作系统上运行的机器码。为了深入理解JVM的底层工作原理&#xff0c;可以从以下几个方面入手&#xff1a; 1. 类加载机制 JVM的类加载机制是其核心之一…...

深入探讨 Java 性能术语与优化实践

在 Java 开发中,性能优化是确保应用程序高效运行的关键。无论是构建实时处理系统还是大规模分布式服务,理解性能术语和分析方法都至关重要。本文将详细介绍 Java 性能中的核心术语,包括延迟(Latency)、吞吐量(Throughput)、利用率(Utilization)、效率(Efficiency)、…...

简单介绍Qt的属性子系统

深入理解Qt的属性系统 ​ 笔者最近正在大规模的开发Qt的项目和工程&#xff0c;这里笔者需要指出的是&#xff0c;这个玩意在最常规的Qt开发中是相对比较少用的&#xff0c;笔者也只是在Qt的QPropertyAnimation需要动画感知笔者设置的一个属性的时候方才知道这个东西的。因此&…...

【PmHub后端篇】PmHub中基于自定义注解和AOP的服务接口鉴权与内部认证实现

1 引言 在现代软件开发中&#xff0c;尤其是在微服务架构下&#xff0c;服务接口的鉴权和内部认证是保障系统安全的重要环节。本文将详细介绍PmHub中如何利用自定义注解和AOP&#xff08;面向切面编程&#xff09;实现服务接口的鉴权和内部认证&#xff0c;所涉及的技术知识点…...

消息~组件(群聊类型)ConcurrentHashMap发送

为什么选择ConcurrentHashMap&#xff1f; 在开发聊天应用时&#xff0c;我们需要存储和管理大量的聊天消息数据&#xff0c;这些数据会被多个线程频繁访问和修改。比如&#xff0c;当多个用户同时发送消息时&#xff0c;服务端需要同时处理这些消息的存储和查询。如果用普通的…...

掌控随心 - 服务网格的流量管理艺术 (Istio 实例)

掌控随心 - 服务网格的流量管理艺术 (Istio 实例) 想象一下,没有服务网格的时候,我们要实现像“将 1% 的用户流量导入到新版本应用”、“根据用户设备类型访问不同后端”、“模拟下游服务故障”这类高级流量策略,通常需要在代码、负载均衡器、API 网关等多个地方进行复杂且分…...

Github 2025-05-13 Python开源项目日报 Top10

根据Github Trendings的统计&#xff0c;今日(2025-05-13统计)共有10个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Python项目10TypeScript项目1 ComfyUI&#xff1a;强大而模块化的稳定扩散GUI 创建周期&#xff1a;399 天开…...

Spring Boot 自动装配原理详解

Spring Boot 的自动装配&#xff08;Auto-Configuration&#xff09;是其核心特性之一&#xff0c;它极大地简化了 Spring 应用的配置过程。通过自动装配&#xff0c;Spring Boot 能够根据项目中的依赖&#xff08;例如&#xff0c;添加了 Spring Data JPA 依赖后自动配置数据库…...

Python核心数据类型全解析:字符串、列表、元组、字典与集合

导读&#xff1a; Python 是一门功能强大且灵活的编程语言&#xff0c;而其核心数据类型是构建高效程序的基础。本文深入剖析了 Python 的五大核心数据类型——字符串、列表、元组、字典和集合&#xff0c;结合实际应用场景与最佳实践&#xff0c;帮助读者全面掌握这些数据类型…...

索尼(sony)摄像机格式化后mp4的恢复方法

索尼(sony)的Alpha 7 Ⅳ系列绝对称的上是索尼的“全画幅标杆机型”&#xff0c;A7M4配备了3300万像素的CMOS&#xff0c;以及全新研发的全画幅背照式Exmor R™CMOS影像传感器&#xff0c;搭载BIONZ XR™影像处理器&#xff0c;与旗舰微单™Alpha 1如出一辙。下面我们来看看A7M4…...

Kubernetes容器运行时:Containerd vs Docker

Containerd 和 Docker 是容器技术领域的两个核心组件&#xff0c;它们在功能定位、架构设计、性能特点及适用场景上有显著差异。以下是两者的详细对比分析&#xff1a; 一、定位与功能 特性DockerContainerd核心定位完整的容器平台&#xff0c;包含构建、运行、编排等全生命周…...

免费专业级 PDF 处理!SolidPDF OCR 识别 + 精准转换批量处理

各位办公小能手们&#xff01;今天咱来聊聊一款超牛的软件——SolidConverterPDF。这可是个专业的多功能PDF处理工具&#xff0c;啥格式转换、文档编辑、扫描识别&#xff0c;它都能搞定&#xff01;下面我就给大伙详细唠唠它的厉害之处。 先说说它的核心功能。 一是PDF格式转换…...

电子电器架构 --- 区域计算架构(Zonal Compute)备战下一代电子电气架构

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 钝感力的“钝”,不是木讷、迟钝,而是直面困境的韧劲和耐力,是面对外界噪音的通透淡然。 生活中有两种人,一种人格外在意别人的眼光;另一种人无论…...

API的学习总结(上)

在 Java 中&#xff0c;API 指的是 Java 提供的一系列类、接口、方法和工具&#xff0c;用于开发 Java 应用程序。Java API 是 Java 平台的核心组成部分&#xff0c;它提供了丰富的功能&#xff0c;包括基础数据类型、集合框架、输入输出、网络编程、多线程、数据库连接等。 核…...

Spring Boot之Web服务器的启动流程分析

如何判断创建哪种web容器&#xff1a;servlet&#xff1f;reactive&#xff1f; 我们在启动Spring Boot程序的时候&#xff0c;会使用SpringApplication.run方法来启动&#xff0c;在启动流程中首先要判断的就是需要启动什么类型的服务器&#xff0c;是servlet&#xff1f;或者…...

代码随想录算法训练营第六十三天| 图论9—卡码网47. 参加科学大会,94. 城市间货物运输 I

每日被新算法方式轰炸的一天&#xff0c;今天是dijkstra&#xff08;堆优化版&#xff09;以及Bellman_ford &#xff0c;尝试理解中&#xff0c;属于是只能照着代码大概说一下在干嘛。 47. 参加科学大会 https://kamacoder.com/problempage.php?pid1047 dijkstra&#xff08…...

RAG之大规模解析 PDF 文档全流程实战

PDF 文档在商业、学术和政府领域无处不在,蕴含着大量宝贵信息。然而,从 PDF 中提取结构化数据却面临着独特的挑战,尤其是在处理数千甚至数百万个文档时。本指南探讨了大规模解析 PDF 的策略和工具。 PDF解析挑战 PDF 的设计初衷是为了提供一致的视觉呈现,而非数据提取。这…...

uart16550详细说明

一、介绍 uart16550 ip core异步串行通信IP连接高性能的微控制器总线AXI,并为异步串行通信提供了 控制接口。软核设计连接了axilite接口。 二、特性 1.axilite接口用于寄存器访问和数据传输 2.16650串口和16450串口的软件和硬件寄存器都是兼容的 3.默认的core配置参数&#xf…...

Docker 环境安装(2025最新版)

Docker在主流的操作系统和云平台上都可以使用&#xff0c;包括Linux操作 系统&#xff08;如Ubuntu、 Debian、Rocky、Redhat等&#xff09;、MacOS操作系统和 Windows操作系统&#xff0c;以及AWS等云平 台。 Docker官网&#xff1a; https://docs.docker.com/ 配置宿主机网…...

Comparator不满足自反性错误,Comparison method violates its general contract

APP运行退出&#xff0c;跟踪信息 java.lang.IllegalArgumentException: Comparison method violates its general contract! Collections.sort(idxsList);//按score升序排列 查看idxs类 public int compareTo(Idxs o) { //重写compareTo方法 return (int) (this.g…...

[Java实战]Spring Boot 3 整合 Apache Shiro(二十一)

[Java实战]Spring Boot 3 整合 Apache Shiro&#xff08;二十一&#xff09; 引言 在复杂的业务系统中&#xff0c;安全控制&#xff08;认证、授权、加密&#xff09;是核心需求。相比于 Spring Security 的重量级设计&#xff0c;Apache Shiro 凭借其简洁的 API 和灵活的扩…...

如何界定合法收集数据?

首席数据官高鹏律师团队 在当今数字化时代&#xff0c;数据的价值日益凸显&#xff0c;而合法收集数据成为了企业、机构以及各类组织必须严守的关键准则。作为律师&#xff0c;深入理解并准确界定合法收集数据的范畴&#xff0c;对于保障各方权益、维护法律秩序至关重要。 一…...

Flask+HTML+Jquery 文件上传下载

HTML 代码&#xff1a; <div id"loadingIndicator" style"display:none;"><div class"spinner"></div> </div> <!-- 请求过程中转圈圈 --> <form action"" method"post" enctype"m…...

MapReduce打包运行

&#xff08;一&#xff09;maven打包 MapReduce是一个分布式运算程序的编程框架&#xff0c;是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序&#xff08;例如&#xff1a;jar…...

国产化Word处理控件Spire.Doc教程:如何使用 C# 从 Word 中提取图片

通过编程方式从 Word 文档中提取图片&#xff0c;可以用于自动化文档处理任务。E-iceblue旗下Spire系列产品是国产文档处理领域的优秀产品&#xff0c;支持国产化&#xff0c;帮助企业高效构建文档处理的应用程序。本文将演示如何使用 C# 和 Spire.Doc for .NET 库从 Word 文件…...