使用Spark Streaming转换不同的JSON有效负载

使用 Spark Streaming,你只需要从数据源创建一个读流,这样就可以创建写入流将数据加载到目标数据源中。 2021-08-20 16:37:42 SparkSpark Strea 如何在 Node.js 和 Express 中使用 Auth0 译文 了解如何将Auth0登录功能添加到Node.js/Express应用程序,并使用经过身份验证的用户信息显示/隐藏UI信息和安全API。 2021-08-20 16:05:28 JavaScript node.js 应用安全 HarmonyOS服务卡片-防疫一码通 由于软通动力员工基数大,在排队过程中人员密集,对于疫情防控工作有着极大的不利影响。软通动力通过使用HarmonyOS原子化卡片服务原理,将行程码转化为卡片服务,提高了扫码效率。 2021-08-20 14:23:14 鸿蒙HarmonyOS应用 19岁「天才少年」自制CPU!1200个晶体管,纯手工打造 这位叫做 Sam Zeloof 的美国大学生,最终打造出1200个晶体管的CPU! 2021-08-20 11:52:14 CPU芯片计算机 5G扛鼎“新基建”同时也要直面实际难题 5G看上去很美,但现实落地应用的过程中也面临着挑战,具体什么挑战,欢迎一起来看看。 2021-08-20 11:29:45 5G移动通信新基建 NFV关键技术:x86架构基础(下篇) 标准服务器技术是网络功能虚拟化(NFV)实现的一个关键因素,了解一些x86架构的基础知识对大家后续了解电信云关键技术,尤其是掌握虚拟化技术原理和关键优化方案是必须具备的。本文接着上篇从x86架构的中断和异常、IO架构等部分进行阐述讲解。 2021-08-20 11:22:05 X86架构NFV虚拟化 NFV关键技术:X86架构基础(上篇) 本文主要从x86架构的CPU指令集增强,内存管理、中断和异常、IO架构等部分进行阐述,以及包含一些基础IT的基本概念的讲解。 2021-08-20 11:12:31 NFVX86架构地址 高并发整体可用性:细说历经磨难的注册中心选型 本篇将带大家 通过分析一个由Zookeeper引发的全链路服务雪崩的真实案例,来说明注册中心的生产场景诉求和选型原则。 2021-08-20 11:05:14 高并发架构分布式 携程持久化KV存储挑战Redis,狂省90%成本…… 过去几年,携程技术保障部门在Redis治理方面做了很多工作,解决了运营上的问题,在私有云上也积累了丰富的经验。后又通过引入Kvrocks,在公有云上实现降本增效的目的,从而支撑了公司的国际化战略。

使用 Spark Streaming,你只需要从数据源创建一个读流,这样就可以创建写入流将数据加载到目标数据源中。

【清一色.com快译】Spark Streaming 是底层基于 Spark Core 的对大数据进行实时计算的框架,可以流方式从源读取数据。只需要从数据源创建一个读取流,然后我们可以创建写入流以将数据加载到目标数据源中。

[[418750]]

接下来的演示,将假设我们有不同的 JSON 有效负载进入一个 kafka 主题,我们需要将其转换并写入另一个 kafka 主题。

创建一个ReadStream

为了能连续接收JSON有效负载作为消息。我们需要首先读取消息并使用spark的readstream创建数据帧。Spark 中提供了 readStream 函数,我们可以使用这个函数基本上创建一个 readStream。这将从 kafka 主题中读取流负载。

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()

我们可以创建一个 case-class(例如CustomerUnion),它将包含JSON有效负载的所有可能字段。这样,我们就能在数据帧上运行select查询而不会失败。

val rawDfValue = rawData.selectExpr("CAST(value AS STRING)").as[String]

val schema = ScalaReflection.schemaFor[CustomerUnion].dataType.asInstanceOf[StructType]

val extractedDFWithSchema = rawDfValue.select(from_json(col("value"), schema).as("data")).select("data.*")

extractedDFWithSchema.createOrReplaceTempView(“tempView”)

这将为我们提供一个数据帧提取的 DFWithSchema,其中包含作为有效负载字段的列。

示例输入负载

这是两个样本输入有效负载,但也可以有更多的有效负载,有些字段不存在(变量)。

{
“id”: 1234,
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

{
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

样例输出负载

根据id字段,我们将决定输出有效负载。如果存在一个 id 字段,我们将把它视为一个用户更新案例,并且在输出有效负载中只发送“Email”和“Phone”。我们可以根据某些条件配置任何字段。这只是一个例子。

如果 id 不存在,我们将发送所有字段。下面是两个输出载荷的示例:

{
“userid”: 1234,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

{
“fullname”:”Jon Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

开始WriteStreams

一旦我们有了数据帧,我们就可以运行尽可能多的sql查询,并根据所需的有效负载写入 kafka 主题。因此,我们可以创建一个包含所有sql查询的列表,并通过该列表进行循环,并调用writeStream函数。让我们假设,我们有一个名为 queryList 的列表,它只包含字符串(即sql查询)。

下面为写入流定义的一个函数:

def startWriteStream(query: String): Unit = {

val transformedDf = spark.sql(query)
transformedDf
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()

}

这将启动列表中每个查询的写入流。

queryList.foreach(startWriteStream)
spark.streams.awaitAnyTermination()

如果我们知道输入有效负载的所有可能字段,那么即使有一些字段不存在,我们的sql查询也不会失败。我们已经将有效负载的模式指定为case-class,它将为缺席字段创建指定 NULL 的数据帧。

通过这种方式,我们可以使用 spark-streaming 在所需的转换/过滤器之后将多个有效负载从同一主题写入不同的主题。

【清一色译稿,合作站点转载请注明原文译者和出处为清一色.com】

©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经

(0)
打赏 微信扫码打赏 微信扫码打赏 支付宝扫码打赏 支付宝扫码打赏
清一色的头像清一色管理团队
上一篇 2023年5月5日 18:00
下一篇 2023年5月5日 18:01

相关推荐

发表评论

登录后才能评论

联系我们

在线咨询:1643011589-QQbutton

手机:13798586780

QQ/微信:1074760229

QQ群:551893940

工作时间:工作日9:00-18:00,节假日休息

关注微信