mlink如何用(flink ml)?如果你对这个不了解,来看看!
“魔窗”推出mLink,由营销窗口转型深度链接Deeplink,下面是给大家的分享,一起来看看。
mlink如何用
36 氪在 2015年 报道过 “魔窗”,他们当时在做的事情就是把 App 内的 Banner 变成营销窗口。开发人员使用 SDK 套件在应用安装包内加入相应代码,预留 Banner 位并授权,然后魔窗后台会提供营销活动。
但这样有一个问题,每个应用需要的活动类型千差万别,要做好供应平台,就得源源不断生产优质营销活动。所以,“魔窗” 现在只保留了几个应用较多的爆款活动,其他的模版则由接入的云来、互动推等第三方平台提供。
近日,36 氪与 “魔窗” CTO 张申竣再次进行交流,发现他们推出了 mLink 产品,说白了就是做 DeepLink(深度链接)。
比如说,朋友在微信上发给你某个商品的介绍链接,假如该电商使用了 Deeplink 技术,且你已安装该电商的 app,那你就可以点击链接,跳转到 app,直接进入该商品页面。如果用户没安装 App,在指引到 App Store 下载注册后,商品页面就会找不到。这时就需要Deeplink 的进阶技术——Deferred Deeplink。保证用户下载安装应用后,依旧可以跳转到商品页面。
所以,mLink 就是对要分享的 App 内容页进行上层优化和分装,生成 URL。这样一来,就能替代原生 App 的 H5 页面,开发者有能力从外部渠道引流到功能页,而不是首页。使用时,App 开发者只需要嵌入 SDK,由魔窗帮他分装一些唤醒的代码逻辑,而业务逻辑需要开发者自行完成。
目前,mLink 支持微信直接跳转,在 iOS9 中的 App 若实现了通用链接的话,也可以自动跳转。
魔窗在做的事情,我们此前也有类似产品的报道,例如真格投资的奇点机智(产品 DeepShare)等。
当问及相较同类产品的差别时,张申竣强调,此前魔窗在做的 Banner 营销窗口,积累了大量 App 的广告位,现在可以通过魔窗作为第三方平台进行分发。如果 App 已经有基于短信、微信、电子邮件等渠道,那么魔窗可以直接帮开发者生成短链服务。此外,mLink 还可以做渠道的效果监测,从点击到转发的下载率、转化率。
至于盈利模式,魔窗是卖账号体系。张申竣告知,虽然现在做的事情跟广告接近,但不会走广告变现的路子。
据悉,魔窗的日活已经达到千万级别,客户中不乏携程、火辣健身、美人相机、简拼、我厨网、e 袋洗、大众点评、G 信、网易等。
flink ml
一、概述数据格式:
# 添加{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.796000","pos":"00000000010000036968","after":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.797000","pos":"00000000010000037147","after":{"EID":"102","ENAME":"siling","ESAL":1234.12}}# 修改{"table":"bms_st.employees","op_type":"U","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.247000","pos":"00000000010000037501","before":{"EID":"102","ENAME":"siling","ESAL":1234.12},"after":{"EID":"102","ENAME":"sunsiling","ESAL":1000.00}}# 删除{"table":"bms_st.employees","op_type":"D","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.248000","pos":"00000000010000037636","before":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}从数据格式中可以看得出:op_type 是我们对数据源的增删改的标志,真正的数据是在 after 或者 before 的值里边的。接下来我们将用 Flink 对这些数据进行 ETL处理 并发往 kafka 供下一层数仓计算使用:
二、项目结构mmain: 程序入口utils:工具类entity:实体类commonbase:抽象父类achieve:实现类
三、项目的实现3.1 静态的资源文件,用于配置信息 application.properties:# source kafka configPJbtServers1: cdh101:9092,cdh102:9092,cdh103:9092PJgroupId1: testPJoffsetReset1: latestPJtopicStr1: piaoju-topic# sink kafka configpj-BtServers2: cdh101:9092,cdh102:9092,cdh103:9092pj-ZkStr2: cdh101:2181,cdh102:2181,cdh102:2181pj-GroupId2: testpj-OffsetReset2: latestpj-TopicStr2: piaoju-to-kafka-topic# ---------------------------------------------------------------------------------------------------------# 员工日增薪资employee_tb_name: bms_st.employeesemployee_job_name: EmployeeSource#employee_create_table: employee_money#employee_row_col: tb_name VARCHAR, op_type VARCHAR, ts VARCHAR, eId VARCHAR, eName VARCHAR, eSal VARCHAR3.2 在 utils目录 下创建获取以上文件信息值的类 LoadPropertiesFile.java:import java.io.InputStream;import java.util.Properties;public class LoadPropertiesFile { public static String getPropertyFileValues(String proKey){ String proStr = ""; try { //读取配置文件 InputStream is = LoadPropertiesFile.class.getClassLoader().getResourceAsStream("application.properties"); Properties properties = new Properties(); properties.load(is); proStr = properties.getProperty(proKey); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } return proStr; }}3.3 commonbase 目录下创建抽象类 对接kafka的数据,并解析关键字段,代码架构如下:package com.nfdwsyy.commonbase;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.nfdwsyy.utils.LoadPropertiesFile;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.text.ParseException;import java.util.Properties;public abstract class SourceCommonBase { public void getDataStream(String jobName) throws Exception { // 1. 环境的设置 // 2.资源配置文件信息的获取 // 3.消费者接收数据并做json的简要解析 // 4.抽象方法的设置}环境的设置: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint env.enableCheckpointing(1000); // Checkpoint 语义设置为 EXACTLY_ONCE env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // CheckPoint 的超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间,只允许 有 1 个 Checkpoint 在发生 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 两次 Checkpoint 之间的最小时间间隔为 500 毫秒 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 当 Flink 任务取消时,保留外部保存的 CheckPoint 信息 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 作业最多允许 Checkpoint 失败 1 次(flink 1.9 开始支持) env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);2.资源配置文件信息的获取:
// 获取资源配置文件信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", LoadPropertiesFile.getPropertyFileValues("PJbtServers1")); properties.setProperty("group.id", LoadPropertiesFile.getPropertyFileValues("PJgroupId1")); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", LoadPropertiesFile.getPropertyFileValues("PJoffsetReset1")); //value 反序列化3.消费者接收数据并做json的简要解析:
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>( LoadPropertiesFile.getPropertyFileValues("PJtopicStr1"), new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(myConsumer).setParallelism(1); // prase json DataStream<String> mStream = stream.map(new MapFunction<String, String>() { @Override public String map(String s) throws Exception { JSONObject jsonObject = JSON.parseObject(s); String table = jsonObject.getString("table"); String op_type = jsonObject.getString("op_type"); String op_ts = jsonObject.getString("op_ts"); String before = jsonObject.getString("before"); String after = jsonObject.getString("after"); String resultStr = parseSourceKafkaJson(table,op_type,op_ts,before,after); return resultStr; } }); // let chirld etl to kafka sendToSinkKafka(mStream); env.execute(jobName);4.抽象方法的设置:
// let chirld class do it public abstract String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException; // sink to kafka public abstract void sendToSinkKafka(DataStream<String> mStream);3.4 achieve下创建实现类,用于对数据进行 ETL 处理,类的架构设计如下:package com.nfdwsyy.achieve;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.nfdwsyy.commonbase.SourceCommonBase;import com.nfdwsyy.entity.Employee;import com.nfdwsyy.utils.LoadPropertiesFile;import com.nfdwsyy.utils.MySinkKafka;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;import org.apache.flink.streaming.api.datastream.DataStream;import java.io.Serializable;import java.text.ParseException;/** * @author feiniu * @create 2020-07-23 10:12 */public class EmpSourceAchi extends SourceCommonBase implements Serializable { @Override public String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException { // 1.数据的 ETL 处理 (这里根据实际情况而定) } @Override public void sendToSinkKafka(DataStream<String> mStream) { // 2.将处理完之后的数据发往 kafka 队列 供下游计算使用 } // 3. 调用父类的处理方法,供主类调用 }1.数据的 ETL 处理:
String eId = ""; String eName = ""; double eSal = 0; double after_money = 0; double before_money = 0; JSONObject jObjBefore = JSON.parseObject(before); JSONObject jObjAfter = JSON.parseObject(after); System.out.println("在 parseSourceKafkaJson 方法中,table -> "+ table +" , op_type -> "+ op_type +" , op_ts -> "+ op_ts +" , before -> "+ before + " , after -> "+ after); String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name"); Employee employee = null; if (StringUtil.isNullOrEmpty(op_type) || StringUtil.isNullOrEmpty(table)){ System.out.println("获取的类型为空哦-> "+ op_type); }else if (table.equals(tb_name)){ switch (op_type){ case "I": eId = jObjAfter.getString("EID"); eName = jObjAfter.getString("ENAME"); eSal = Double.parseDouble(jObjAfter.getString("ESAL")); break; case "U": eId = jObjAfter.getString("EID"); eName = jObjAfter.getString("ENAME"); after_money = Double.valueOf(jObjAfter.getString("ESAL")); before_money = Double.valueOf(jObjBefore.getString("ESAL")); eSal = after_money - before_money; break; case "D": eId = jObjBefore.getString("EID"); eName = jObjBefore.getString("ENAME"); eSal = Double.parseDouble("-"+ jObjBefore.getString("ESAL")); break; } employee = new Employee(tb_name, op_type, op_ts, eId, eName, eSal); } // the entity must have tb_name return JSONObject.toJSONString(employee);2.将处理完之后的数据发往 kafka :
DataStream<String> mS = mStream.filter(new FilterFunction<String>() { @Override public boolean filter(String s) throws Exception { if (StringUtil.isNullOrEmpty(s)){ return false; } else { return true; } } }); String broker_list = LoadPropertiesFile.getPropertyFileValues("pj-BtServers2"); String topic = LoadPropertiesFile.getPropertyFileValues("pj-TopicStr2"); String groupId = LoadPropertiesFile.getPropertyFileValues("pj-GroupId2"); String offsetReset = LoadPropertiesFile.getPropertyFileValues("pj-OffsetReset2"); // the entity must have tb_name String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name"); // 发往 Kafka 的自定义类 mS.addSink(new MySinkKafka(broker_list, topic, groupId, offsetReset, tb_name)).name("employee_tb_name");调用父类的处理方法,供主类调用 : // transfer the parent method public void successKafka2KafkaMethod(){ try { String jobName = LoadPropertiesFile.getPropertyFileValues("employee_job_name"); getDataStream(jobName +" Source"); } catch (Exception e) { e.printStackTrace(); } }到这里整体上算是弄完了,但是要注意的一点是数据发往 kafka 的类是需要我们去自定义的,接下来我们再去创建一个数据发往 kafka 的工具类:
package com.nfdwsyy.utils;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class MySinkKafka extends RichSinkFunction<String> { private Properties props = null; private KafkaProducer producer = null; private ProducerRecord record = null; private String broker_list; private String topic; private String groupId; private String offsetReset; private String sourceTbName; public MySinkKafka(String broker_list, String topic, String groupId, String offsetReset, String sourceTbName) { this.broker_list = broker_list; this.topic = topic; this.groupId = groupId; this.offsetReset = offsetReset; this.sourceTbName = sourceTbName; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); props = new Properties(); props.put("bootstrap.servers", broker_list); props.put("group.id", groupId); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化 props.put("auto.offset.reset", offsetReset); //value 反序列化 producer = new KafkaProducer<String, String>(props); } @Override public void invoke(String value, Context context) { if(value.equals("") || value.equals("null")) { System.out.println("Sink 中 invoke 方法过来的字符串值-> "+ value); } else { JSONObject jObjNew = JSON.parseObject(value); String tb_name = jObjNew.getString("tb_name"); System.out.println("表明对比 -> " + tb_name + " --- " + sourceTbName); if (tb_name.equals(sourceTbName)) { record = new ProducerRecord<String, String>(topic, null, null, value); producer.send(record); System.out.println("发送数据: " + value); producer.flush(); } } } @Override public void close() throws Exception { super.close(); }}创建主类,调用ETL方法:package com.nfdwsyy.mmain;import com.nfdwsyy.achieve.EmpSourceAchi;public class EmployeeMain01 { public static void main(String[] args){ EmpSourceAchi empAchi = new EmpSourceAchi(); empAchi.successKafka2KafkaMethod(); }}好了,全部代码都写完了,接下来我们可以去测试使用咯。
四、本地测试 并 打包部署上 yarn4.1 本地测试运行程序之后对数据库的源表进行增删改,即可在控制台看到发往kafka的数据,这里不做本地测试。
4.2 部署上 yarn 服务器打包并上传至服务器的指定目录,然后执行如下命令部署应用:
bin/flink run -m yarn-cluster -ynm oggsyncflinkjob -d -c com.nfdwsyy.mmain.EmployeeMain01 /opt/mycdhflink/myjar/Kafka2FlinkETL2Kafka.jar这时候我们可以在页面上部署情况了:
接下来我们再启动接收ETL之后的消费者:
bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic piaoju-to-kafka-topic --from-beginning源库中对表数据操作:
处理过后的数据如下图: